package spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StopGracefully {
  val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {

    //当前批次内容的计算
    val sum: Int = values.sum

    //取出状态信息中上一次状态
    val lastState: Int = status.getOrElse(0)

    Some(sum + lastState)
  }

  private def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("StopGracefully")

    //设置优雅的关闭
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    ssc.checkpoint("./ck")

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)

    val word: DStream[String] = line.flatMap(_.split(" "))

    val wordAndOne: DStream[(String, Int)] = word.map((_, 1))

    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)

    wordAndCount.print()

    ssc
  }

  def main(args: Array[String]): Unit = {

    // 从检查点启动 — — 恢复状态
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
      "./ck",
      () => createSSC()
    )

    // 优雅的关闭操作 — — 内部逻辑需要去类中获取
    new Thread(new MonitorStop(ssc)).start()

    ssc.start()
    ssc.awaitTermination()
  }
}
